{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql import Row\n",
    "from pyspark.sql.types import *\n",
    "from pyspark.sql.functions import *\n",
    "spark = SparkSession.builder.appName(\n",
    "    \"HelloSpark\").master(\"local\").getOrCreate()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# rdd -> dataframe\n",
    "rdd = spark.sparkContext.parallelize([(\"jayChou\", 41), (\"burukeyou\", 23)])\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "ename": "ParseException",
     "evalue": "\"\\nmismatched input '<EOF>' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'ANY', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'PIVOT', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'FIRST', 'AFTER', 'LAST', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'DIRECTORY', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'COST', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IGNORE', 'BOTH', 'LEADING', 'TRAILING', 'IF', 'POSITION', 'EXTRACT', 'DIV', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', 'GLOBAL', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 4)\\n\\n== SQL ==\\nname\\n----^^^\\n\"",
     "output_type": "error",
     "traceback": [
      "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[1;31mPy4JJavaError\u001b[0m                             Traceback (most recent call last)",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\utils.py\u001b[0m in \u001b[0;36mdeco\u001b[1;34m(*a, **kw)\u001b[0m\n\u001b[0;32m     62\u001b[0m         \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 63\u001b[1;33m             \u001b[1;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0ma\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m     64\u001b[0m         \u001b[1;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[1;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\py4j\\protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[1;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[0;32m    327\u001b[0m                     \u001b[1;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[1;33m.\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 328\u001b[1;33m                     format(target_id, \".\", name), value)\n\u001b[0m\u001b[0;32m    329\u001b[0m             \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.sql.api.python.PythonSQLUtils.parseDataType.\n: org.apache.spark.sql.catalyst.parser.ParseException: \nDataType name is not supported.(line 1, pos 0)\n\n== SQL ==\nname\n^^^\n\r\n\tat org.apache.spark.sql.catalyst.parser.AstBuilder$$anonfun$visitPrimitiveDataType$1.apply(AstBuilder.scala:1769)\r\n\tat org.apache.spark.sql.catalyst.parser.AstBuilder$$anonfun$visitPrimitiveDataType$1.apply(AstBuilder.scala:1747)\r\n\tat org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:108)\r\n\tat org.apache.spark.sql.catalyst.parser.AstBuilder.visitPrimitiveDataType(AstBuilder.scala:1747)\r\n\tat org.apache.spark.sql.catalyst.parser.AstBuilder.visitPrimitiveDataType(AstBuilder.scala:49)\r\n\tat org.apache.spark.sql.catalyst.parser.SqlBaseParser$PrimitiveDataTypeContext.accept(SqlBaseParser.java:14452)\r\n\tat org.apache.spark.sql.catalyst.parser.AstBuilder.typedVisit(AstBuilder.scala:55)\r\n\tat org.apache.spark.sql.catalyst.parser.AstBuilder.org$apache$spark$sql$catalyst$parser$AstBuilder$$visitSparkDataType(AstBuilder.scala:1741)\r\n\tat org.apache.spark.sql.catalyst.parser.AstBuilder$$anonfun$visitSingleDataType$1.apply(AstBuilder.scala:90)\r\n\tat org.apache.spark.sql.catalyst.parser.AstBuilder$$anonfun$visitSingleDataType$1.apply(AstBuilder.scala:90)\r\n\tat org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:108)\r\n\tat org.apache.spark.sql.catalyst.parser.AstBuilder.visitSingleDataType(AstBuilder.scala:89)\r\n\tat org.apache.spark.sql.catalyst.parser.AbstractSqlParser$$anonfun$parseDataType$1.apply(ParseDriver.scala:40)\r\n\tat org.apache.spark.sql.catalyst.parser.AbstractSqlParser$$anonfun$parseDataType$1.apply(ParseDriver.scala:39)\r\n\tat org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:100)\r\n\tat org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseDataType(ParseDriver.scala:39)\r\n\tat org.apache.spark.sql.api.python.PythonSQLUtils$.parseDataType(PythonSQLUtils.scala:34)\r\n\tat org.apache.spark.sql.api.python.PythonSQLUtils.parseDataType(PythonSQLUtils.scala)\r\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\r\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\r\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\r\n\tat java.lang.reflect.Method.invoke(Method.java:498)\r\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\r\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\r\n\tat py4j.Gateway.invoke(Gateway.java:282)\r\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\r\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\r\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\r\n\tat java.lang.Thread.run(Thread.java:748)\r\n",
      "\nDuring handling of the above exception, another exception occurred:\n",
      "\u001b[1;31mParseException\u001b[0m                            Traceback (most recent call last)",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\types.py\u001b[0m in \u001b[0;36m_parse_datatype_string\u001b[1;34m(s)\u001b[0m\n\u001b[0;32m    813\u001b[0m             \u001b[1;31m# For backwards compatibility, \"integer\", \"struct<fieldname: datatype>\" and etc.\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 814\u001b[1;33m             \u001b[1;32mreturn\u001b[0m \u001b[0mfrom_ddl_datatype\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0ms\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    815\u001b[0m         \u001b[1;32mexcept\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\types.py\u001b[0m in \u001b[0;36mfrom_ddl_datatype\u001b[1;34m(type_str)\u001b[0m\n\u001b[0;32m    805\u001b[0m         return _parse_datatype_json_string(\n\u001b[1;32m--> 806\u001b[1;33m             sc._jvm.org.apache.spark.sql.api.python.PythonSQLUtils.parseDataType(type_str).json())\n\u001b[0m\u001b[0;32m    807\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\py4j\\java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[1;34m(self, *args)\u001b[0m\n\u001b[0;32m   1256\u001b[0m         return_value = get_return_value(\n\u001b[1;32m-> 1257\u001b[1;33m             answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[0;32m   1258\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\utils.py\u001b[0m in \u001b[0;36mdeco\u001b[1;34m(*a, **kw)\u001b[0m\n\u001b[0;32m     72\u001b[0m             \u001b[1;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m'org.apache.spark.sql.catalyst.parser.ParseException: '\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 73\u001b[1;33m                 \u001b[1;32mraise\u001b[0m \u001b[0mParseException\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m': '\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m     74\u001b[0m             \u001b[1;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m'org.apache.spark.sql.streaming.StreamingQueryException: '\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;31mParseException\u001b[0m: '\\nDataType name is not supported.(line 1, pos 0)\\n\\n== SQL ==\\nname\\n^^^\\n'",
      "\nDuring handling of the above exception, another exception occurred:\n",
      "\u001b[1;31mPy4JJavaError\u001b[0m                             Traceback (most recent call last)",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\utils.py\u001b[0m in \u001b[0;36mdeco\u001b[1;34m(*a, **kw)\u001b[0m\n\u001b[0;32m     62\u001b[0m         \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 63\u001b[1;33m             \u001b[1;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0ma\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m     64\u001b[0m         \u001b[1;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[1;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\py4j\\protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[1;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[0;32m    327\u001b[0m                     \u001b[1;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[1;33m.\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 328\u001b[1;33m                     format(target_id, \".\", name), value)\n\u001b[0m\u001b[0;32m    329\u001b[0m             \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.sql.api.python.PythonSQLUtils.parseDataType.\n: org.apache.spark.sql.catalyst.parser.ParseException: \nmismatched input '>' expecting ':'(line 1, pos 11)\n\n== SQL ==\nstruct<name>\n-----------^^^\n\r\n\tat org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)\r\n\tat org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117)\r\n\tat org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseDataType(ParseDriver.scala:39)\r\n\tat org.apache.spark.sql.api.python.PythonSQLUtils$.parseDataType(PythonSQLUtils.scala:34)\r\n\tat org.apache.spark.sql.api.python.PythonSQLUtils.parseDataType(PythonSQLUtils.scala)\r\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\r\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\r\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\r\n\tat java.lang.reflect.Method.invoke(Method.java:498)\r\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\r\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\r\n\tat py4j.Gateway.invoke(Gateway.java:282)\r\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\r\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\r\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\r\n\tat java.lang.Thread.run(Thread.java:748)\r\n",
      "\nDuring handling of the above exception, another exception occurred:\n",
      "\u001b[1;31mParseException\u001b[0m                            Traceback (most recent call last)",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\types.py\u001b[0m in \u001b[0;36m_parse_datatype_string\u001b[1;34m(s)\u001b[0m\n\u001b[0;32m    817\u001b[0m                 \u001b[1;31m# For backwards compatibility, \"fieldname: datatype, fieldname: datatype\" case.\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 818\u001b[1;33m                 \u001b[1;32mreturn\u001b[0m \u001b[0mfrom_ddl_datatype\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m\"struct<%s>\"\u001b[0m \u001b[1;33m%\u001b[0m \u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstrip\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    819\u001b[0m             \u001b[1;32mexcept\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\types.py\u001b[0m in \u001b[0;36mfrom_ddl_datatype\u001b[1;34m(type_str)\u001b[0m\n\u001b[0;32m    805\u001b[0m         return _parse_datatype_json_string(\n\u001b[1;32m--> 806\u001b[1;33m             sc._jvm.org.apache.spark.sql.api.python.PythonSQLUtils.parseDataType(type_str).json())\n\u001b[0m\u001b[0;32m    807\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\py4j\\java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[1;34m(self, *args)\u001b[0m\n\u001b[0;32m   1256\u001b[0m         return_value = get_return_value(\n\u001b[1;32m-> 1257\u001b[1;33m             answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[0;32m   1258\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\utils.py\u001b[0m in \u001b[0;36mdeco\u001b[1;34m(*a, **kw)\u001b[0m\n\u001b[0;32m     72\u001b[0m             \u001b[1;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m'org.apache.spark.sql.catalyst.parser.ParseException: '\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 73\u001b[1;33m                 \u001b[1;32mraise\u001b[0m \u001b[0mParseException\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m': '\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m     74\u001b[0m             \u001b[1;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m'org.apache.spark.sql.streaming.StreamingQueryException: '\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;31mParseException\u001b[0m: \"\\nmismatched input '>' expecting ':'(line 1, pos 11)\\n\\n== SQL ==\\nstruct<name>\\n-----------^^^\\n\"",
      "\nDuring handling of the above exception, another exception occurred:\n",
      "\u001b[1;31mParseException\u001b[0m                            Traceback (most recent call last)",
      "\u001b[1;32m~\\AppData\\Local\\Temp\\ipykernel_21256\\4161229752.py\u001b[0m in \u001b[0;36m<module>\u001b[1;34m\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mdataframe\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mrdd\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mtoDF\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m\"name\"\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;34m\"age\"\u001b[0m\u001b[1;33m)\u001b[0m\u001b[0;31m\\\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\session.py\u001b[0m in \u001b[0;36mtoDF\u001b[1;34m(self, schema, sampleRatio)\u001b[0m\n\u001b[0;32m     56\u001b[0m         \u001b[1;33m[\u001b[0m\u001b[0mRow\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mname\u001b[0m\u001b[1;33m=\u001b[0m\u001b[1;34mu'Alice'\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mage\u001b[0m\u001b[1;33m=\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m     57\u001b[0m         \"\"\"\n\u001b[1;32m---> 58\u001b[1;33m         \u001b[1;32mreturn\u001b[0m \u001b[0msparkSession\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcreateDataFrame\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mschema\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0msampleRatio\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m     59\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m     60\u001b[0m     \u001b[0mRDD\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mtoDF\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mtoDF\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\session.py\u001b[0m in \u001b[0;36mcreateDataFrame\u001b[1;34m(self, data, schema, samplingRatio, verifySchema)\u001b[0m\n\u001b[0;32m    673\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    674\u001b[0m         \u001b[1;32mif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mschema\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mbasestring\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 675\u001b[1;33m             \u001b[0mschema\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0m_parse_datatype_string\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mschema\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    676\u001b[0m         \u001b[1;32melif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mschema\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m(\u001b[0m\u001b[0mlist\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mtuple\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    677\u001b[0m             \u001b[1;31m# Must re-encode any unicode strings to be consistent with StructField names\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\types.py\u001b[0m in \u001b[0;36m_parse_datatype_string\u001b[1;34m(s)\u001b[0m\n\u001b[0;32m    818\u001b[0m                 \u001b[1;32mreturn\u001b[0m \u001b[0mfrom_ddl_datatype\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m\"struct<%s>\"\u001b[0m \u001b[1;33m%\u001b[0m \u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstrip\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    819\u001b[0m             \u001b[1;32mexcept\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 820\u001b[1;33m                 \u001b[1;32mraise\u001b[0m \u001b[0me\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    821\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    822\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\types.py\u001b[0m in \u001b[0;36m_parse_datatype_string\u001b[1;34m(s)\u001b[0m\n\u001b[0;32m    808\u001b[0m     \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    809\u001b[0m         \u001b[1;31m# DDL format, \"fieldname datatype, fieldname datatype\".\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 810\u001b[1;33m         \u001b[1;32mreturn\u001b[0m \u001b[0mfrom_ddl_schema\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0ms\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    811\u001b[0m     \u001b[1;32mexcept\u001b[0m \u001b[0mException\u001b[0m \u001b[1;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    812\u001b[0m         \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\types.py\u001b[0m in \u001b[0;36mfrom_ddl_schema\u001b[1;34m(type_str)\u001b[0m\n\u001b[0;32m    800\u001b[0m     \u001b[1;32mdef\u001b[0m \u001b[0mfrom_ddl_schema\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mtype_str\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    801\u001b[0m         return _parse_datatype_json_string(\n\u001b[1;32m--> 802\u001b[1;33m             sc._jvm.org.apache.spark.sql.types.StructType.fromDDL(type_str).json())\n\u001b[0m\u001b[0;32m    803\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    804\u001b[0m     \u001b[1;32mdef\u001b[0m \u001b[0mfrom_ddl_datatype\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mtype_str\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\py4j\\java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[1;34m(self, *args)\u001b[0m\n\u001b[0;32m   1255\u001b[0m         \u001b[0manswer\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m   1256\u001b[0m         return_value = get_return_value(\n\u001b[1;32m-> 1257\u001b[1;33m             answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[0;32m   1258\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m   1259\u001b[0m         \u001b[1;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32md:\\Users\\Min\\Anaconda3\\envs\\pyspark2\\lib\\site-packages\\pyspark\\sql\\utils.py\u001b[0m in \u001b[0;36mdeco\u001b[1;34m(*a, **kw)\u001b[0m\n\u001b[0;32m     71\u001b[0m                 \u001b[1;32mraise\u001b[0m \u001b[0mAnalysisException\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m': '\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m     72\u001b[0m             \u001b[1;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m'org.apache.spark.sql.catalyst.parser.ParseException: '\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 73\u001b[1;33m                 \u001b[1;32mraise\u001b[0m \u001b[0mParseException\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m': '\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m     74\u001b[0m             \u001b[1;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m'org.apache.spark.sql.streaming.StreamingQueryException: '\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m     75\u001b[0m                 \u001b[1;32mraise\u001b[0m \u001b[0mStreamingQueryException\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0ms\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m': '\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;31mParseException\u001b[0m: \"\\nmismatched input '<EOF>' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'ANY', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'PIVOT', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'FIRST', 'AFTER', 'LAST', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'DIRECTORY', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'COST', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IGNORE', 'BOTH', 'LEADING', 'TRAILING', 'IF', 'POSITION', 'EXTRACT', 'DIV', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', 'GLOBAL', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 4)\\n\\n== SQL ==\\nname\\n----^^^\\n\""
     ]
    }
   ],
   "source": [
    "dataframe = rdd.toDF(\"name\",\"age\")\\"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "1. 转换\n",
    "2. 过滤\n",
    "3. 聚合\n",
    "4. 切分\n",
    "5. 排序\n",
    "6. 分区\n",
    "7. 去重\n",
    "8. 集合操作"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3.7.13 ('pyspark2')",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.13"
  },
  "orig_nbformat": 4,
  "vscode": {
   "interpreter": {
    "hash": "48e87896ed641b154b37119b199fe5cbef001faa1d3cf1f339fb46433cd23b9b"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
